package com.atguigu.udmp.stream.function;

import com.atguigu.udmp.stream.bean.UserEvent;
import com.atguigu.udmp.stream.common.util.DorisJdbcUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;

import java.util.Set;

@Slf4j
public class CheckPartitionFunction extends RichMapFunction<UserEvent,UserEvent> {

    Set<String> partitionSet=null;

    public void open(Configuration parameters) throws Exception {
        // 加载当前表的已有分区

        partitionSet= DorisJdbcUtil.showPartition("user_event");
        log.info("加载分区:"+partitionSet);
    }

    @Override
    public UserEvent map(UserEvent userEvent) throws Exception {
        String dt = userEvent.getDt().replace("-", "");
        Long eventDefineId = userEvent.getEventDefineId();
        String partitionName="p_"+dt+"_"+eventDefineId;  //拼接分区名称 p_2
        String[]  partitionValues=new String[]{userEvent.getDt(),userEvent.getEventDefineId()+""};
        if(!partitionSet.contains(partitionName)){
            DorisJdbcUtil.addPartition("user_event",partitionName,partitionValues);
            partitionSet.add(partitionName);
        }
        return userEvent;
    }
}
